Skip to content

Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652

Open
thirtiseven wants to merge 12 commits intoNVIDIA:mainfrom
thirtiseven:array_aggregate
Open

Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652
thirtiseven wants to merge 12 commits intoNVIDIA:mainfrom
thirtiseven:array_aggregate

Conversation

@thirtiseven
Copy link
Copy Markdown
Collaborator

@thirtiseven thirtiseven commented Apr 23, 2026

Contributes to #8532.

What ArrayAggregate does

ArrayAggregate (SQL aggregate / reduce) is a 4-arg HOF:

aggregate(arr, zero, (acc, x) -> merge_expr, acc -> finish_expr)

Semantically a fold: starting from zero, each element x is folded into the accumulator via merge, and finish transforms the final accumulator (defaults to identity). Any lambda is allowed, so the CPU implementation is a sequential per-element loop — there is no general GPU mapping.

Motivation. A customer workload using aggregate(filter(arr, …), 0, (acc, z) -> acc + CASE WHEN <predicate> THEN 1 ELSE 0 END) currently falls back entirely because ArrayAggregate has no GPU implementation. #8532 lists the candidate strategies (cuDF AST, PTX UDF, pattern rewrite); this PR takes the pattern-rewrite path with a small extensible AggOp trait.

Approach

A general fold can't run on the GPU, but in practice almost every real aggregate lambda has the shape

(acc, x) -> op(acc, g(x))

where op is associative + commutative (Add / Multiply / Greatest / Least / And / Or) and g(x) only depends on x. With finish as identity, this is equivalent to:

result = (op-reduce over [g(x) for x in arr]) op zero

g(x) parallelises element-wise; op maps to cuDF's listReduce (segmented reduction). At plan time we pattern-match the lambda; matched shapes run on the GPU, everything else falls back to CPU.

Components

Role
AggOp (sealed trait) + 6 case objects (SumOp, ProductOp, MaxOp, MinOp, AllOp, AnyOp) Each owns its cuDF segmented-reduction aggregation, null policy, identity scalar, combine-with-zero step, and Catalyst shape matcher (AddSumOp, Greatest(a,b)MaxOp, …). Adding a new op is one case object + appending to allOps.
ArrayAggregateDecomposer.decompose(merge, finish, argType, zeroType): Either[String, Decomposition] Single source of truth for "is this shape ever GPU-able?" — checks lambda shape, finish is identity, op type support, g.dataType == zeroType, ALL/ANY null-element guard. Returns Left(reason) for any mismatch.
GpuArrayAggregateMeta Plan-translation bridge: forwards decompose's Left to willNotWorkOnGpu(reason), picks the GPU sub-meta for g(x) by index (gIsLeftOfMergeBody) instead of walking the meta tree.
GpuArrayAggregate The runtime expression.

GpuArrayTransformBase was split: it now holds only the explode + lambda-projection plumbing shared with GpuArrayTransform / GpuArrayExists / GpuArrayFilter (which moved to a new GpuArrayElementWiseTransform sub-trait that owns transformListColumnView + the standard columnarEval). GpuArrayAggregate extends the smaller base directly and writes its own columnarEval.

Runtime pipeline

For each input batch:

  1. Eval g(x) + segmented reduce. Reuse GpuArrayTransformBase's explode path to project g(x) over the array children, rewrap as list<g_type> with the original offsets/validity, and call listReduce(op.cudfAgg, op.nullPolicy, outDType).
  2. Substitute identity for empty/no-contributing rows. cuDF returns null for empty segments (and for all-null segments under EXCLUDE). substituteMask builds a mask of those rows and ifElses in op.identityScalar(zeroType) so the next step sees a sane value.
  3. Combine with zero. result = op.combineWithZero(adjusted, zero, outDType). When zero is a GpuLiteral we pass a cudf.Scalar directly (skips the per-batch column broadcast); otherwise we evaluate zero as a column.
  4. Restore null for null-list rows. cuDF's NULL_MAX / NULL_MIN / LOGICAL_AND / LOGICAL_OR don't propagate null the Spark 3VL way, so the final NullUtilities.mergeNulls re-applies arg's null mask. Skipped outright when arg has no nulls.

Each step releases the previous step's intermediate GPU column via withResource chaining, so the exploded batch (typically the largest allocation) doesn't pin memory through the whole pipeline.

Null semantics

  • SUM / PRODUCT / ALL / ANY use NullPolicy.INCLUDE — one null element poisons the row, matching Spark's iterative acc op null = null.
  • MAX / MIN use NullPolicy.EXCLUDE — null elements are skipped, matching Spark's Greatest / Least.
  • MAX / MIN combineWithZero uses BinaryOp.NULL_MAX / NULL_MIN (the same primitive GpuGreatest / GpuLeast use), so step 3 is one kernel rather than greaterThan + ifElse.
  • Null input lists always map to null output via the step-4 mergeNulls.

What falls back to CPU (and why)

All of these are decided in the decomposer and surface as a willNotWorkOnGpu reason:

Limitation Reason
g(x) references acc g must parallelise across elements; an acc reference re-introduces sequential dependence.
g.dataType != zeroType Spark's checkInputDataTypes requires merge.dataType == zero.dataType; we honour that to avoid silent recasts.
MAX / MIN on FLOAT / DOUBLE cuDF's fmax(NaN, x) = x (NaN absorbed) vs Spark's Double.compare (NaN propagates).
ALL / ANY on null-bearing arrays cuDF's INCLUDE-null all/any returns null on any null; Spark's 3VL short-circuits (false AND null = false, true OR null = true).
PRODUCT on DECIMAL Would need DecimalUtils.multiplyDecimals for overflow handling; SUM on DECIMAL is fine and supported.
Non-identity finish Out of scope for this pass.
Subtract / Divide / 3-ary Greatest / etc. op must be one of the registered associative+commutative shapes.
argument element type outside commonCudfTypes + DECIMAL_128 + NULL + BINARY + STRUCT Limited by GpuArrayTransformBase's explode path. BINARY support added here matches PR #14618.

Tests

  • Unit (ArrayAggregateDecomposerSuite) — 24 cases covering each op's positive shape, commutation, Cast(acc, …) unwrap, complex g, and every rejection path the decomposer owns (wrong shape, non-identity finish, g references acc, type mismatch, NaN-affected MAX, ALL on null-bearing arrays).
  • Integration (higher_order_functions_test.py) — happy paths for each op (numeric / native int / boolean), structural variations (count-if pattern, filter+aggregate composed pattern, non-zero init, null/empty array, lambda referencing outer column, zero from outer column, struct field, BINARY element via length(x), decimal SUM, long-overflow wrap), plus explicit fallback tests for every rejection reason listed above.

ArrayAggregate perf

500000 rows × array(len≈100), 200 replicas/query, avg of 3 runs (warmup × 1)

Scenario CPU (s) GPU (s) Speedup
SUM (cast) 36.878 15.511 2.38x
count-if (CASE WHEN) 36.503 16.415 2.22x
count-if (if shape) 36.377 16.196 2.25x
MAX (integral) 38.964 15.250 2.56x
CaseWhen multi-branch 36.405 16.417 2.22x

Checklists

Documentation

  • Updated for new or modified user-facing features or behaviors
  • No user-facing change

Testing

  • Added or modified tests to cover new code paths
  • Covered by existing tests
  • Not required

Performance

  • Tests ran and results are added in the PR description
  • Issue filed with a link in the PR description
  • Not required

thirtiseven and others added 3 commits April 21, 2026 15:29
Implements ArrayAggregate on the GPU for lambdas decomposable as
(acc, x) -> acc + g(x) with an identity finish. Other shapes fall back
to the CPU.

- ArrayAggregateDecomposer: match merge body against Add(acc, g),
  unwrap Cast on the acc side, validate finish is identity
- GpuArrayAggregate: evaluate g(x) via the existing
  GpuArrayTransformBase explode path, then listReduce + combine with
  zero. Uses NullPolicy.INCLUDE so null elements poison the sum, matching
  Spark's iterative `acc + null = null` semantics. Empty (non-null) lists
  are substituted with op's identity before the add-zero step; null
  lists stay null and propagate.
- Decimal identity scalar is bound to the column's DType (via
  Scalar.fromDecimal(BigInteger, DType)) so ifElse / add don't trip on
  DECIMAL32-vs-DECIMAL128 width mismatches.
- Unit tests for the decomposer and integration tests covering the
  client pattern, null / empty arrays, non-zero init, outer-column refs,
  struct-field access, long overflow, decimal sum, and fallback cases.

Addresses part of NVIDIA#8532.
A follow-up refactor will introduce a normalize pass and AggOp trait
to support PRODUCT / MIN / MAX / AND / OR and Cast stripping.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

Greptile Summary

This PR implements GpuArrayAggregate for the SQL aggregate/reduce HOF, supporting SUM, PRODUCT, MAX, MIN, ALL, and ANY via a pattern-rewrite decomposer that maps eligible lambdas to cuDF segmented reductions. All previously identified issues from the review thread (float/double NaN for MAX/MIN, float SUM ordering divergence, nullable contract) have been addressed: floats are excluded from ExtremumOp, variableFloatAgg.enabled gates SUM/PRODUCT on floats, ALL/ANY fall back for nullable arrays, and nullable unconditionally returns true.

Confidence Score: 5/5

Safe to merge; all previously identified P0/P1 issues have been addressed and only P2 defensive-coding concerns remain.

All items from the prior review thread (NaN semantics for MAX/MIN, float SUM ordering, nullable correctness, ALL/ANY 3VL) are resolved. Two new P2 findings: convertToGpuImpl throws IllegalStateException instead of willNotWorkOnGpu if g fails GPU tagging (dead code in practice given the decomposer invariant), and a minor resource-leak edge case when GpuColumnVector.from throws after mergeNulls. Neither represents a present defect on any tested code path.

sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — GpuArrayAggregateMeta.convertToGpuImpl and the mergeNulls call site in GpuArrayAggregate.columnarEval

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Adds GpuArrayAggregate, AggOp trait + 6 case objects, ArrayAggregateDecomposer, and GpuArrayAggregateMeta; refactors GpuArrayTransformBase into GpuArrayTransformBase + GpuArrayElementWiseTransform. Two P2 issues: convertToGpuImpl throws IllegalStateException instead of calling willNotWorkOnGpu, and mergeNulls result not closed on exception.
integration_tests/src/main/python/higher_order_functions_test.py Adds comprehensive GPU/CPU equality tests and fallback tests for all 6 ops, shape variations, null/empty arrays, struct fields, binary elements, decimal SUM, and every documented rejection path. Uses assert_gpu_and_cpu_are_equal_collect and assert_gpu_fallback_collect correctly.
tests/src/test/scala/com/nvidia/spark/rapids/ArrayAggregateDecomposerSuite.scala New unit suite with 24 cases covering all op shapes, commutation, Cast-unwrap, If/CaseWhen lift, and every rejection path. Extends GpuUnitTests for shim-agnostic Add/Multiply defaults.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala Registers ArrayAggregate in GpuOverrides with correct TypeSig (commonCudfTypes + DECIMAL_128 for output; argument allows ARRAY.nested with BINARY + STRUCT + NULL).

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[ArrayAggregate SQL node] --> B[GpuArrayAggregateMeta.tagExprForGpu]
    B --> C{ArrayAggregateDecomposer.decompose}
    C -->|Left: reason| D[willNotWorkOnGpu → CPU fallback]
    C -->|Right: Decomposition| E{SumOp/ProductOp on Float/Double?}
    E -->|variableFloatAgg=false| D
    E -->|OK| F[decomposition = Some d]
    F --> G[convertToGpuImpl]
    G --> H[Wrap d.g as gLambda x→g x]
    H --> I[GpuArrayAggregate columnarEval]
    I --> J[Step 1: makeElementProjectBatch + g eval + listReduce]
    J --> K[Step 2: substituteMask → ifElse identity]
    K --> L[Step 3: combineWithZero via op]
    L --> M{arg has nulls?}
    M -->|Yes| N[Step 4: mergeNulls restores null list rows]
    M -->|No| O[Return result]
    N --> O
Loading

Reviews (7): Last reviewed commit: "Gate float/double SUM/PRODUCT in ArrayAg..." | Re-trigger Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Outdated
Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Outdated
Comment thread integration_tests/src/main/python/higher_order_functions_test.py Outdated
cuDF's segmented max / min and the combineWithZero compare + ifElse both follow
IEEE 754, where fmax(NaN, x) = x (NaN is absorbed). Spark's Greatest / Least use
Double.compare, which treats NaN as larger than every other value and propagates
it. For an array column containing NaN, GPU would return a non-NaN result while
CPU would return NaN — a data-correctness divergence flagged on the PR.

Since customer workloads for ArrayAggregate MAX / MIN are integral-typed, take
the conservative route: narrow ExtremumOp.supportsType to
{Byte, Short, Int, Long} and fall back to CPU on Float / Double. Precise
NaN propagation would require two extra segmented reduces per batch and
explicit NaN handling in combineWithZero; leaving that for a follow-up if a
real workload needs it.

Added an integration test that verifies the Float / Double fallback.
@thirtiseven
Copy link
Copy Markdown
Collaborator Author

Thanks @greptile-apps for catching the NaN divergence. Pushed a conservative fix: ExtremumOp.supportsType now only accepts ByteType / ShortType / IntegerType / LongType, so Float / Double MAX / MIN fall back to CPU. Added an integration test that exercises the fallback.

Precise NaN propagation on the GPU would need two extra segmented reduces per batch (for any(isNan)) plus explicit NaN handling in combineWithZero, which adds overhead to the non-NaN common case. Since customer workloads for ArrayAggregate MAX / MIN are all integral-typed today, keeping it as a restriction is preferable to shipping a more complex and slower path. This whole PR is already explicitly a partial implementation; float/double extremum fits with the other scoped-out cases (top-level Cast, non-identity finish, nullable-element ALL/ANY).

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

The fix looks correct. ExtremumOp.supportsType now gates cleanly on ByteType | ShortType | IntegerType | LongType, the tagExprForGpu path will call willNotWorkOnGpu for Float/Double before convertToGpuImpl is ever reached, and the fallback test covers the expected behavior.

One minor nit: MaxOp.identityScalar and MinOp.identityScalar still contain FloatType/DoubleType arms (Float.NegativeInfinity / Double.PositiveInfinity etc.) that are now unreachable. They're harmless, but if you want to keep the code honest you could drop those arms — or leave a // dead until float/double NaN handling is added comment. Either way, not a blocker.

The reasoning in the scaladoc (fmax(NaN, x) = x vs Double.compare propagation, plus the combineWithZero ifElse breakage) is exactly the right level of detail to make the restriction self-documenting for whoever tackles it later. LGTM from my end.

Tip: You can customize Greptile's behavior for this repo with .greptile/rules and .greptile/config.

ExtremumOp.supportsType already rejects Float/Double, so the per-type cascade
in MaxOp.identityScalar and MinOp.identityScalar will never see them today.
Remove the dead arms to keep the code honest; they can be added back when a
follow-up adds real NaN propagation.
Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala Outdated
Previously `nullable = argument.nullable`, which is incorrect when the outer
list is non-nullable but its elements can be null. For INCLUDE-policy ops
(SUM / PRODUCT), a null element anywhere in a non-null list poisons the
accumulator and yields a null output row. Reporting nullable=false in that
case can let the Spark optimizer elide null checks and cause silent wrong
results downstream.

Spark's own ArrayAggregate.nullable returns `argument.nullable ||
finish.nullable`, and the finish lambda's acc variable is always bound with
nullable=true (see ArrayAggregate.bind's `zero.dataType -> true`), so the
CPU side is effectively always true. Match that.
@thirtiseven
Copy link
Copy Markdown
Collaborator Author

Good catch, pushed a fix. nullable now returns true unconditionally, matching Spark's ArrayAggregate.nullable = argument.nullable || finish.nullable — the finish lambda's acc variable is bound with nullable = true in ArrayAggregate.bind (zero.dataType -> true), so the CPU side is effectively always true. That also correctly reflects the INCLUDE-policy poisoning case (null element in a non-null list ⇒ null row) you flagged.

… int tests

- AllOp / AnyOp combineWithZero now pass outDType to cuDF's and / or (ProductOp
  and SumOp were already doing this via add / mul). MaxOp / MinOp use ifElse,
  which has no outType argument; the output type there is determined by the
  inputs (both reduced and zero carry outDType already).

- ArrayAggregateDecomposition now stores the g sub-expression directly instead
  of a gChildIndex. convertToGpuImpl locates the GPU g via fastEquals under the
  merge body's meta children rather than positional indexing, so we don't rely
  on the Add / Multiply / And / Or / Greatest / Least meta-children happening
  to be laid out as [left, right]. Decomposer unit tests assert on g identity.

- Each val-chain boundary in columnarEval is now wrapped in closeOnExcept(x) {
  _ => withResource(x) { ... } } so the transitional window between a step's
  result being assigned and the next withResource taking ownership is covered.
  cuDF's ColumnVector.close is refcount-based, so the rare double-close on
  exception paths is benign.

- Added a parametric native-integer integration test hitting int / long SUM,
  int MAX, and long MIN without the Cast-to-BIGINT that the existing numeric
  test uses, exercising identityScalar / combineWithZero on the primitive
  types directly.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven marked this pull request as draft April 23, 2026 10:03
@thirtiseven thirtiseven self-assigned this Apr 28, 2026
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Copy Markdown
Collaborator Author

Filed follow-up performance issue #14711

@thirtiseven thirtiseven marked this pull request as ready for review April 30, 2026 09:35
…gg.enabled

cuDF's parallel tree-reduction sums in a different order than Spark's sequential
left-fold, so GPU vs CPU can differ in the low bits on Float/Double. Reuse the
same conf gate as scalar GpuSum/GpuAverage (spark.rapids.sql.variableFloatAgg.enabled)
via GpuOverrides.checkAndTagFloatAgg in GpuArrayAggregateMeta. Default true
matches the global policy. Added integration test for the conf=false fallback path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants